package com.zhang.socket;

import com.zhang.dao.Human;
import com.zhang.service.message.MessageServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@ServerEndpoint(value = "/test/oneToMany/{nickname}")
@Service
@Slf4j
public class OneToManyWebSocket {
    // 记录连接数
    private static AtomicInteger onlineCount = new AtomicInteger(0);

    // 存放所有客户端
    private static Map<String, Human> clients = new ConcurrentHashMap<>();

    //要注入的service或者dao
    private static MessageServiceImpl messageService;


    @Autowired
    public void setMessageServiceImpl(MessageServiceImpl messageServiceImpl) {
        OneToManyWebSocket.messageService = messageServiceImpl;
    }

    // 连接建立调用
    @OnOpen
    public void onOpen(Session session, @PathParam("nickname") String nickname) {
        onlineCount.incrementAndGet(); // 在线数加1
        Human human = new Human(session, nickname);
        clients.put(session.getId(), human);

        this.sendMessage(String.valueOf(onlineCount.get()),"count", session, true);
        this.sendMessage(nickname + "加入会话", "msg", session, false);
        log.info("{} 加入会话，当前在线人数为：{}", nickname, onlineCount.get());
    }

    // 连接关闭调用
    @OnClose
    public void onClose(Session session) {
        onlineCount.decrementAndGet(); // 在线数减1

        Human human = clients.get(session.getId());

        this.sendMessage(String.valueOf(onlineCount.get()),"count", session, false);
        this.sendMessage(human.getName() + "离开会话", "msg", session, false);

        clients.remove(session.getId());
        log.info("有一连接关闭：{}，当前在线人数为：{}", session.getId(), onlineCount.get());
    }

    // 处理客户端发来的消息
    @OnMessage
    public void onMessage(String message, Session session) {
        // 将消息存入kafka
        if(!"".equals(message) && message != null){

            messageService.sendUserMassageAndNameToAll( "msgꅄ" + clients.get(session.getId()).getName() + "ꅄ" + message);
        }

        log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
        this.sendMessage(message, "msg", session, false);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
    }

    // 发送消息
    private void sendMessage(String message, String prefix, Session fromSession, boolean toMe) {
        Human human = clients.get(fromSession.getId());
        String nickName = human.getName();

        if(prefix.equals("msg")) {
            prefix += "ꅄ" + nickName;
        }else {
            prefix += "ꅄ";
        }
        for (Map.Entry<String, Human> sessionEntry : clients.entrySet()) {
            Session toSession = sessionEntry.getValue().getSession();

            if (!toSession.getId().equals(fromSession.getId())) {
                log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
                toSession.getAsyncRemote().sendText( prefix + ": " + message);
            }else if(toMe){
                // 给不给自己发消息
                log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
                toSession.getAsyncRemote().sendText( prefix + ": " + message);
            }
        }
    }
}
